package com.bigdata.wsr.task;

import com.bigdata.wsr.deserialize.JsonDebeziumDeserializationSchema;
import com.bigdata.wsr.filter.TableDataFilter;
import com.bigdata.wsr.map.CdcDataMap;
import com.bigdata.wsr.process.Process;
import com.bigdata.wsr.utils.TheTableListSyncUtil;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class AccountTask implements Process {
    // MySQL 配置
    private Config mysqlConfig = ConfigFactory.load("mysql_137_account.properties");
    private String mysqlHostname = mysqlConfig.getString("mysql.hostname");
    private int mysqlPort = mysqlConfig.getInt("mysql.port");
    private String mysqlUser = mysqlConfig.getString("mysql.user");
    private String mysqlPassword = mysqlConfig.getString("mysql.password");
    private String database = mysqlConfig.getString("mysql.database");
    private String tables = mysqlConfig.getString("mysql.tables");

    // doris 配置
    private Config dorisConfig = ConfigFactory.load("doris_account.properties");
    private String dorisServer = dorisConfig.getString("doris.server");
    private String dorisUsername = dorisConfig.getString("doris.username");
    private String dorisPassword = dorisConfig.getString("doris.password");
    private String dorisDatabase = dorisConfig.getString("doris.database");

    //todo：1、读取配置文件，获取待同步表信息。
    List<String> tableList = new ArrayList<>();
    String dbTableList = TheTableListSyncUtil.getDbTableList(database, tables, tableList);

    @Override
    public void process(StreamExecutionEnvironment env, String ServerId, int parallelism) {
        //todo：3、source     【加载 MySQL中的表】
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(mysqlHostname)
                .port(mysqlPort)
                .username(mysqlUser)
                .password(mysqlPassword)
                .databaseList(database)
                .tableList(dbTableList)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverId(ServerId)
                .scanNewlyAddedTableEnabled(true)//scan.incremental.snapshot.enabled
                .includeSchemaChanges(false) // 只接入dml、不考虑ddl
                .startupOptions(StartupOptions.initial())//设置 增量同步/全量同步
                .build();

        SingleOutputStreamOperator<String> processData = env.setParallelism(parallelism)
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source")
                .map(new CdcDataMap());

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");

        //todo：4、分流、同步delete、sink
        for (String tableName : tableList) {
            processData.filter(new TableDataFilter(tableName))//根据表名，进行分流。
                    .addSink(
                            DorisSink.sink(
                                    DorisExecutionOptions.builder()
                                            .setBatchSize(1000)
                                            .setBatchIntervalMs(1000L)
                                            .setMaxRetries(3)
                                            .setStreamLoadProp(pro)
                                            .setEnableDelete(true)
                                            .build(),
                                    DorisOptions.builder()
                                            .setFenodes(dorisServer)
                                            .setUsername(dorisUsername)
                                            .setPassword(dorisPassword)
                                            .setTableIdentifier(String.format("%s.%s", dorisDatabase, tableName))
                                            .build()
                            )
                    ).name(tableName);
        }
    }
}
